﻿using CqCore.Extensions;
using CqCore.Logging;
using CqCore.Messaging;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitmqService.Extensions;
using RabbitmqService.Options;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitmqService
{
    public class RabbitmqSubscriber : IMessageSubscriber
    {
        readonly RabbitmqSubscriberOption options = null;
        readonly ILogRecorder logger = null;

        IConnection connection = null;
        IModel channel = null;

        Type handlingType { get; set; }
        Func<object, Task> handler { get; set; }

        public bool ConnectionOpened => (connection != null) && connection.IsOpen;
        public bool ChannelOpened => (channel != null) && channel.IsOpen;

        public RabbitmqSubscriber(RabbitmqSubscriberOption options, ILogRecorder<RabbitmqSubscriber> logger)
        {
            this.options = options;
            this.logger = logger;
        }

        private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            logger.Warn(new EventData
            {
                Type = "RabbitmqConnection",
                Message = "RabbitMQ Connection Shutdown",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["QueueName"] = options.QueueName
                }
            });
        }
        private void Connection_RecoverySucceeded(object sender, EventArgs e)
        {
            logger.Warn(new EventData
            {
                Type = "RabbitmqConnectionRecovery",
                Message = "RabbitMQ Connection Recovery Succeeded",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["QueueName"] = options.QueueName
                }
            });
        }
        private void Connection_ConnectionRecoveryError(object sender, ConnectionRecoveryErrorEventArgs e)
        {
            logger.Warn(new EventData
            {
                Type = "RabbitmqConnectionRecovery",
                Message = "RabbitMQ Connection Recovery Error",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["QueueName"] = options.QueueName
                }
            });
        }
        private void Consumer_Shutdown(object sender, ShutdownEventArgs e)
        {
            logger.Warn(new EventData
            {
                Type = "RabbitmqConsumer",
                Message = "RabbitMQ Consumer Shutdown",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["QueueName"] = options.QueueName
                }
            });
        }

        private void StartConnection()
        {
            if (ConnectionOpened)
            {
                connection.Close();
            }

            connection = new ConnectionFactory()
            {
                HostName = options.HostName,
                VirtualHost = options.VirtualHost,
                Port = options.Port,
                UserName = options.UserName,
                Password = options.Password,
                RequestedHeartbeat = 10,
                AutomaticRecoveryEnabled = true
            }.CreateConnection(options.ClientProvidedName ?? AppDomain.CurrentDomain.GetApplicationName());

            connection.ConnectionShutdown += Connection_ConnectionShutdown;
            connection.RecoverySucceeded += Connection_RecoverySucceeded;
            connection.ConnectionRecoveryError += Connection_ConnectionRecoveryError;

            logger.Debug(new EventData
            {
                Type = "RabbitmqConnection",
                Message = "RabbitMQ Connection Successfully",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["QueueName"] = options.QueueName
                }
            });
        }
        private void StartChannel()
        {
            if (ChannelOpened)
            {
                channel.Close();
            }

            StartConnection();

            channel = connection.CreateModel();
            channel.BasicQos(0, options.PrefetchCount, false);

            logger.Debug(new EventData
            {
                Type = "RabbitmqChannel",
                Message = "RabbitMQ Channel Successfully",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["QueueName"] = options.QueueName
                }
            });
        }
        private void StartConsumer()
        {
            StartChannel();

            var consumer = new EventingBasicConsumer(channel);
            consumer.Shutdown += Consumer_Shutdown;
            consumer.Received += async (sender, e) =>
            {
                await ReceiveMessage(sender, e);
            };
            channel.BasicConsume(options.QueueName, false, consumer);

            logger.Debug(new EventData
            {
                Type = "RabbitmqConsumer",
                Message = "RabbitMQ Consumer Successfully",
                Labels = {
                    ["HostName"] = options.HostName,
                    ["Port"] = $"{ options.Port }",
                    ["QueueName"] = options.QueueName
                }
            });
        }
        private void Start()
        {
            if (ConnectionOpened && ChannelOpened)
            {
                logger.Warn(new EventData
                {
                    Type = "RabbitmqStart",
                    Message = "RabbitMQ Connection and Channel is open",
                    Labels = {
                        ["HostName"] = options.HostName,
                        ["Port"] = $"{ options.Port }",
                        ["QueueName"] = options.QueueName
                    }
                });

                return;
            }

            try
            {
                StartConsumer();

                logger.Info(new EventData
                {
                    Type = "RabbitmqStart",
                    Message = "RabbitMQ Start Successfully",
                    Labels = {
                        ["HostName"] = options.HostName,
                        ["Port"] = $"{ options.Port }",
                        ["QueueName"] = options.QueueName
                    }
                });
            }
            catch (Exception ex)
            {
                logger.Warn(new EventData
                {
                    Type = "RabbitmqStart",
                    Message = ex.GetBaseException().Message,
                    Labels = {
                        ["HostName"] = options.HostName,
                        ["Port"] = $"{ options.Port }",
                        ["QueueName"] = options.QueueName
                    }
                });

                Thread.Sleep(TimeSpan.FromSeconds(options.StartRetryInterval));

                Start();
            }
        }

        public Task StartAsync()
        {
            return Task.Run(() => Start());
        }

        private async Task<bool> HandleMessage(object message)
        {
            try
            {
                await handler(message);

                logger.Debug(new EventData
                {
                    Type = "RabbitmqHandleMessage",
                    Message = "RabbitMQ Handle Message Completed",
                    Labels = {
                        ["QueueName"] = options.QueueName,
                        ["Message"] = JsonConvert.SerializeObject(message)
                    }
                });

                return true;
            }
            catch (Exception ex)
            {
                logger.Error(new EventData
                {
                    Type = "RabbitmqHandleMessage",
                    Message = ex.GetBaseException().Message,
                    Labels = {
                        ["QueueName"] = options.QueueName,
                        ["Message"] = JsonConvert.SerializeObject(message)
                    }
                });

                return false;
            }
        }
        private async Task ReceiveMessage(object sender, BasicDeliverEventArgs e)
        {
            logger.Debug(new EventData
            {
                Type = "RabbitmqReceiveMessage",
                Message = "RabbitMQ Receive Message Completed",
                Labels = {
                    ["QueueName"] = options.QueueName,
                    ["Topic"] = e.RoutingKey,
                    ["Headers"] = JsonConvert.SerializeObject(e.BasicProperties.Headers),
                    ["Message"] = Encoding.UTF8.GetString(e.Body)
                }
            });

            var success = false;

            //var message = MessagePackSerializer.NonGeneric.Deserialize(messageType, args.Body);
            try
            {
                var message = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(e.Body), handlingType);
                success = await HandleMessage(message);
            }
            catch (Exception ex)
            {
                if (ex.Message.Contains("getting value from 'Topic'"))
                {
                    //消息结构不对，拒绝
                    channel.BasicReject(e.DeliveryTag, false);
                    logger.Fatal(new EventData
                    {
                        Type = "RabbitmqMessageTopicException",
                        Message = "Rabbitmq Message Structure Error",
                        Labels = {
                            ["QueueName"] = options.QueueName,
                            ["Topic"] = e.RoutingKey,
                            ["Headers"] = JsonConvert.SerializeObject(e.BasicProperties.Headers),
                            ["Message"] = Encoding.UTF8.GetString(e.Body)
                        }
                    });

                }

            }



            if (success)
            {
                channel.BasicAck(e.DeliveryTag, options.MultipleAck);

                logger.Info(new EventData
                {
                    Type = "RabbitmqReplyMessage",
                    Message = "RabbitMQ Reply Message Completed",
                    Labels = {
                        ["QueueName"] = options.QueueName,
                        ["Topic"] = e.RoutingKey,
                        ["Headers"] = JsonConvert.SerializeObject(e.BasicProperties.Headers),
                        ["Message"] = Encoding.UTF8.GetString(e.Body)
                    }
                });
            }
            else
            {
                if (options.UseDeadLetter)
                {
                    if (!e.KeepAttempts(options.AttemptCount))
                    {
                        //超过死信重试次数
                        logger.Fatal(new EventData
                        {
                            Type = "RabbitmqDeadthRetryMax",
                            Message = "RabbitMQ Deadth retry limit has been exceeded",
                            Labels = {
                            ["QueueName"] = options.QueueName,
                            ["Topic"] = e.RoutingKey,
                            ["Headers"] = JsonConvert.SerializeObject(e.BasicProperties.Headers),
                            ["Message"] = Encoding.UTF8.GetString(e.Body)
                        }
                        });
                        channel.BasicAck(e.DeliveryTag, options.MultipleAck);
                    }
                    else
                    {
                        channel.BasicReject(e.DeliveryTag, false);
                        logger.Warn(new EventData
                        {
                            Type = "RabbitmqRejectMessage",
                            Message = "RabbitMQ Reject Message Completed",
                            Labels = {
                            ["QueueName"] = options.QueueName,
                            ["Topic"] = e.RoutingKey,
                            ["Headers"] = JsonConvert.SerializeObject(e.BasicProperties.Headers),
                            ["Message"] = Encoding.UTF8.GetString(e.Body)
                        }
                        });
                    }
                }
                else
                {
                    channel.BasicNack(e.DeliveryTag, false, requeue: true);
                    logger.Error(new EventData
                    {
                        Type = "RabbitmqBasicNackMessage",
                        Message = "RabbitMQ Message Requeue",
                        Labels = {
                            ["QueueName"] = options.QueueName,
                            ["Topic"] = e.RoutingKey,
                            ["Headers"] = JsonConvert.SerializeObject(e.BasicProperties.Headers),
                            ["Message"] = Encoding.UTF8.GetString(e.Body)
                        }
                    });
                }


            }
        }

        public Task SubscribeAsync<TMessage>(string topic, Func<TMessage, Task> handler)
        {
            handlingType = typeof(TMessage);
            this.handler = (message) =>
            {
                if (message is TMessage)
                {
                    return handler((TMessage)message);
                }
                else
                {
                    throw new Exception("message type error");
                }
            };

            logger.Debug(new EventData
            {
                Type = "RabbitmqSubscribeMessage",
                Message = "RabbitMQ Subscribe Message Completed",
                Labels = {
                    ["QueueName"] = options.QueueName,
                    ["Template"] = topic
                }
            });

            return Task.CompletedTask;
        }

        public void Dispose()
        {
            if (ChannelOpened)
            {
                channel.Close();
            }

            if (ConnectionOpened)
            {
                connection.Close();
            }
        }
    }
}
